public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() ->
System.out.println("Background thread finished."));
System.out.println("Main thread finished.");
}
ExecutorService executor = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) yourExecutor);
that basically does:
executor.setThreadFactory(
new ThreadFactoryBuilder().setDaemon(true))
while ((line = in.readLine()) != null) {
try {
// ...
} catch (InterruptedException e) {
e.printStackTrace(); // or ignore
}
}
while (!Thread.currentThread().isInterrupted() && ...) {
try {
// ...
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static final Message POISON_PILL = new Message(-1);
while (true) {
Message message = queue.take();
if (POISON_PILL.equals(message)) {
return;
}
// handle message here ...
}
ExecutorService executor = Executors.newFixedThreadPool(8);
while (true) {
executor.execute(() -> {
LOG.info(Thread.currentThread().getName() + " is working");
Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
if (i.getAndIncrement() % 4 == 0) { // inject 25% errors
throw new RuntimeException();
}
});
}
pool-1-thread-1 is working
pool-1-thread-2 is working
Exception in thread "pool-1-thread-1" java.lang.RuntimeException
at UnstoppablePool.lambda$main$0(UnstoppablePool.java:27)
pool-1-thread-4 is working
pool-1-thread-5 is working
pool-1-thread-6 is working
pool-1-thread-8 is working
Exception in thread "pool-1-thread-6" java.lang.RuntimeException
at UnstoppablePool.lambda$main$0(UnstoppablePool.java:27)
pool-1-thread-9 is working
pool-1-thread-10 is working
new Runnable() {
public void run() {
try {
// logic goes here
} catch (Throwable e) {
// error handling
}
}
}
At least log an error:
Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Thread [" + t + "] died abruptly", e);
}
});
or bail out:
Thread.setDefaultUncaughtExceptionHandler(
UncaughtExceptionHandlers.systemExit());
ExecutorService pool = Executors.newSingleThreadExecutor();
while (true) {
i.getAndIncrement();
pool.execute(() -> {
LOG.info("Tasks in queue: " + i.get());
LOG.info("Slow thread working...");
Uninterruptibles.sleepUninterruptibly(5L, TimeUnit.SECONDS);
});
}
Tasks in queue: 10406024
Slow consumer working...
java.lang.OutOfMemoryError: GC overhead limit exceeded
eshelestovich:/$ java TooManyThreads
1
2
...
2023
2024
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
eshelestovich:/$ sysctl kern.num_taskthreads
kern.num_taskthreads: 2048
eshelestovich:/$ java -server -XX:+UnlockDiagnosticVMOptions
-XX:+PrintFlagsFinal -XX:+PrintCommandLineFlags
-version | grep ThreadStackSize
intx ThreadStackSize = 1024
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=2
new ThreadPoolExecutor(
8, 32,
60L, TimeUnit.SECONDS, // keepAliveTime
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy()); // backpressure
For CPU bound tasks, Brian Goetz recommends:
threads = number of CPUs + 1
For mixed workloads, Subramaniam & Goetz agree on:
threads = number of CPUs * (1 + wait time / service time)
E.g. with 8-cores CPU on a 30% I/O workload:
8 * (1 + 30 / 70) = 11 threads
If 95% of the program is parallelizable, the theoretical maximum speedup is only x20
In order to maintain throughput of 500 req/sec with average latency of 150 ms, we need to allocate 75 parallel workers (threads/processes).
It works the other way around too!
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode):
"pool-1-thread-32" #42 prio=5
"pool-1-thread-31" #41 prio=5
"pool-1-thread-30" #40 prio=5
"pool-1-thread-29" #39 prio=5
"pool-1-thread-28" #38 prio=5
"pool-1-thread-27" #37 prio=5
"pool-1-thread-26" #36 prio=5
...
| Complicates troubleshooting |
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("my-kafka-pool-%d").build();
ExecutorService pool = Executors.newFixedThreadPool(4, factory);
Threads section summary…
private Object[] items = new Object[] {};
private ArrayCopy add(Object newItem) {
synchronized (items) {
Object[] newArray = new Object[items.length + 1];
...
items = newArray; // lock object mutation
}
return this;
}
| Bug in Tomcat (46990) |
private final String lock = "LOCK";
public void doSomething() {
synchronized (lock) { ... }
}
private final Integer lock = 42;
public void doSomething() {
synchronized (lock) { ... }
}
| Bug in Jetty |
private static SysCtx ctx; // static shared state
public synchronized SysCtx getSysCtx() { // two intrinsic locks
if (ctx == null) {
ctx = new SysCtx();
}
return ctx;
}
| Bug in Android (12015587) |
synchronized (this) {
Future<User> user = remoteService.getUser(id); // network I/O
// ...
}
| Bug in Log4j (41214). Set timeouts explicitly! |
Lock Coarsening, -XX:+EliminateLocks
public String getNames(StringBuffer sb) {
sb.append("Alice");
sb.append("Claire");
return sb.toString();
}
Lock Elision, -XX:+DoEscapeAnalysis
public String getNames() {
StringBuffer sb = new StringBuffer();
sb.append("Alice");
sb.append(someName);
sb.append("Barbara");
return sb.toString();
}
Serialization hurts scalability.
Context switches hurt performance.
Contended locking causes both. Avoid it!
Lock[] locks = new ReentrantLock[16];
public User getUser(int id) {
Lock lock = locks[id % 16];
lock.lock();
...
}
/